package com.example.service.impl;

import com.example.model.CachedHttpReqToResend;
import com.example.service.CachedHttpReqProcessor;
import com.example.service.ICachedHttpReqToResendService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;

import java.util.Collection;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
public abstract class AbstractCachedHttpReqProcessor implements CachedHttpReqProcessor {
    /**
     * 这个队列长度，有待根据业务来定
     */
    private LinkedBlockingQueue<CachedHttpReqToResend> blockingQueue = new LinkedBlockingQueue<>(500);

    private AtomicBoolean workerInited = new AtomicBoolean(false);

    Thread workerThread;

    @Autowired
    private ICachedHttpReqToResendService iCachedHttpReqToResendService;



    @Override
    public void process(Collection<CachedHttpReqToResend> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }

        /**
         * 直到有任务要处理时（该方法被调用时），才去初始化线程
         */
        if (workerInited.compareAndSet(false, true)) {
            workerThread = new Thread(new InnerWorker());
            workerThread.setDaemon(true);
            workerThread.setName(getThreadName());
            workerThread.start();
        }

        /**
         * 放到阻塞队列里
         */
        blockingQueue.addAll(list);
    }

    public abstract boolean doProcess(Integer reqType, CachedHttpReqToResend cachedHttpReqToResend);

    /**
     * 测试下事务
     * todo
     */
    /**
     * 从队列取数据；取到后，调用子类的方法去处理；
     * 子类处理后，返回处理结果
     * 根据结果，设置成功或者失败的状态
     */
    public  class InnerWorker implements Runnable {

        @Override
        public void run() {
            while (true) {
                boolean interrupted = Thread.currentThread().isInterrupted();
                if (interrupted) {
                    log.info("interrupted ,break out");
                    break;
                }

                CachedHttpReqToResend cachedHttpReqToResend;
                try {
                    cachedHttpReqToResend = blockingQueue.take();
                } catch (InterruptedException e) {
                    log.info("interrupted,e:{}", e);
                    break;
                }

                Integer reqType = cachedHttpReqToResend.getReqType();
                if (reqType == null) {
                    continue;
                }

                try {
                    /**
                     * 使用模板方法设计模式，交给子类去实现
                     */
                    boolean success = doProcess(reqType, cachedHttpReqToResend);


                    if (!success) {
                        cachedHttpReqToResend.setFailCount(cachedHttpReqToResend.getFailCount() + 1);
                    } else {
                        cachedHttpReqToResend.setCurrentState(CachedHttpReqToResend.CURRENT_STATE_SUCCESS);
                        cachedHttpReqToResend.setSuccessTime(new Date());
                    }

                    boolean count = iCachedHttpReqToResendService.updateById(cachedHttpReqToResend);
                    if (count) {
                        log.debug("update sucess");
                    }
                } catch (Throwable throwable) {
                    log.error("e:{}", throwable);
                    continue;
                }

            }

        }
    }
}
